阶段 A:数据基础设施

本阶段为量化中台的地基,目标是建立统一的时序数据存储、标准化采集框架、查询API和管理界面。


总览

子任务 内容 优先级
A1 TimescaleDB 时序数据库 P0 - 第一步
A2 Connector 数据采集框架 P0 - 第二步
A5 数据查询 API P0 - 第三步
A6 Dashboard 数据管理页 P1 - 第四步
A3 数据调度器 P1 - 第五步
A4 数据质量监控 P1 - 第六步

A1. TimescaleDB 时序数据库

目标

安装 PostgreSQL + TimescaleDB 扩展,替代 zip 文件存储,建立 4 层数据模型。

数据层级

层级 名称 说明 表名
L1 行情 OHLCV + VWAP market_ohlcv
L2 Flow 期权流、大单、暗池 flow_data
L3 Info 新闻、公告、情绪 info_data
L4 另类 社媒、链上、另类信号 alt_data

核心表结构

market_ohlcv (L1 行情)

CREATE TABLE market_ohlcv (
    time        TIMESTAMPTZ NOT NULL,
    symbol      TEXT NOT NULL,
    source      TEXT NOT NULL,
    timeframe   TEXT NOT NULL,        -- '1d', '1h', '5m', '1m'
    open        FLOAT8,
    high        FLOAT8,
    low         FLOAT8,
    close       FLOAT8,
    volume      BIGINT,
    vwap        FLOAT8
);
SELECT create_hypertable('market_ohlcv', 'time');
CREATE INDEX idx_ohlcv_symbol_time ON market_ohlcv (symbol, time DESC);

flow_data (L2 资金流)

CREATE TABLE flow_data (
    time        TIMESTAMPTZ NOT NULL,
    symbol      TEXT NOT NULL,
    source      TEXT NOT NULL,
    flow_type   TEXT,                 -- 'option', 'darkpool', 'block'
    side        TEXT,                 -- 'call', 'put', 'buy', 'sell'
    premium     FLOAT8,
    volume      BIGINT,
    strike      FLOAT8,
    expiry      DATE,
    raw_data    JSONB
);
SELECT create_hypertable('flow_data', 'time');

info_data (L3 信息)

CREATE TABLE info_data (
    time        TIMESTAMPTZ NOT NULL,
    source      TEXT NOT NULL,
    category    TEXT,                 -- 'news', 'earnings', 'sec_filing'
    symbol      TEXT,
    headline    TEXT,
    sentiment   FLOAT8,              -- -1.0 ~ +1.0
    confidence  FLOAT8,              -- 0.0 ~ 1.0
    raw_data    JSONB
);
SELECT create_hypertable('info_data', 'time');

alt_data (L4 另类数据)

CREATE TABLE alt_data (
    time          TIMESTAMPTZ NOT NULL,
    source        TEXT NOT NULL,
    category      TEXT,              -- 'social', 'onchain', 'satellite'
    symbol        TEXT,
    signal_value  FLOAT8,
    metadata      JSONB
);
SELECT create_hypertable('alt_data', 'time');

data_sources (数据源注册表)

CREATE TABLE data_sources (
    id         SERIAL PRIMARY KEY,
    name       TEXT UNIQUE NOT NULL,
    layer      TEXT NOT NULL,        -- 'L1', 'L2', 'L3', 'L4'
    api_type   TEXT,                 -- 'rest', 'websocket', 'file'
    config     JSONB DEFAULT '{}',
    enabled    BOOLEAN DEFAULT true,
    last_sync  TIMESTAMPTZ,
    status     TEXT DEFAULT 'idle'   -- 'idle', 'syncing', 'error', 'ok'
);

文件清单

文件 说明
src/data/__init__.py 包初始化
src/data/database.py SQLAlchemy async 连接池 + 表定义
src/data/models.py Pydantic 模型(请求/响应)
scripts/init_db.py 初始化 TimescaleDB + 建表
scripts/migrate_yfinance.py 迁移现有 9 个 symbol 日线到数据库

迁移数据

现有 9 个 symbol 的日线数据(1998-2026)将从 yfinance CSV/zip 迁移到 market_ohlcv 表:


A2. Connector 数据采集框架

目标

统一的数据源接口,每个源一个 Connector,支持拉取和实时流。

BaseConnector 接口

from abc import ABC, abstractmethod
from typing import AsyncIterator

class BaseConnector(ABC):
    source_name: str
    layer: str  # 'L1', 'L2', 'L3', 'L4'

    @abstractmethod
    async def fetch(self, symbols: list[str], start: str, end: str) -> list[dict]:
        """批量拉取历史数据"""
        ...

    async def stream(self, symbols: list[str]) -> AsyncIterator[dict]:
        """实时数据流(可选实现)"""
        raise NotImplementedError

    @abstractmethod
    def transform(self, raw: dict) -> list[dict]:
        """原始数据 → 标准化格式"""
        ...

    async def validate(self, data: list[dict]) -> list[dict]:
        """数据校验,过滤无效记录"""
        return [d for d in data if self._is_valid(d)]

Connector 列表

Connector 层级 状态 文件
yfinance L1 ✅ 本次实现 yfinance_conn.py
Unusual Whales L2 ⏳ 待接入 unusual_whales.py
FlowAlgo L2 ⏳ 待接入 flowalgo.py
Polymarket L4 ⏳ 待接入 polymarket.py

文件清单

文件 说明
src/data/connectors/__init__.py 包初始化 + 注册表
src/data/connectors/base.py BaseConnector 抽象类
src/data/connectors/yfinance_conn.py yfinance 数据源实现

A3. 数据调度器

目标

基于 APScheduler 的定时采集调度,按数据层级设置不同频率。

调度策略

层级 频率 触发时间
L1 日线 每日 收盘后 16:30 ET
L1 分钟线 每5分钟 交易时段 9:30-16:00 ET
L2 Flow 每分钟 交易时段
L3 Info 每15分钟 全天
L4 另类 按源特性 自定义

文件


A4. 数据质量监控

检查维度

维度 说明 阈值
完整性 缺失交易日检测 缺失率 < 1%
异常值 价格跳变、量能突变 3σ 标准差
延迟 数据到达延迟 < 5分钟 (L1日线)
一致性 跨源数据对比 偏差 < 0.1%

文件


A5. 数据查询 API

端点列表

方法 路径 说明
GET /api/v1/data/ohlcv 查询行情数据
GET /api/v1/data/flow 查询 Flow 数据
GET /api/v1/data/sources 数据源列表及状态
POST /api/v1/data/sync/{source} 手动触发同步
GET /api/v1/data/quality 数据质量报告

参数示例

GET /api/v1/data/ohlcv?symbol=SPY&start=2025-01-01&end=2025-12-31&timeframe=1d

Response:
{
  "symbol": "SPY",
  "timeframe": "1d",
  "count": 252,
  "data": [
    {"time": "2025-01-02T00:00:00Z", "open": 470.5, "high": 472.3, ...},
    ...
  ]
}

文件


A6. Dashboard 数据管理页

页面路径

/data — 数据管理页

功能模块

  1. 数据源状态卡片 — 名称、层级、最后同步时间、记录数、状态灯(绿/黄/红)
  2. 数据覆盖热力图 — symbol × 日期矩阵,颜色深浅表示数据完整度
  3. 手动同步按钮 — 触发指定数据源立即同步
  4. 数据质量指标面板 — 完整性、异常值、延迟等指标可视化

文件

文件 说明
dashboard/src/pages/DataManager.jsx 数据管理页组件
dashboard/src/api/client.js 增加 data API 函数
dashboard/src/App.jsx 增加 /data 路由
dashboard/src/components/Sidebar.jsx 增加 Data 导航项

实施顺序

A1 TimescaleDB 建库建表
 └──→ A2 Connector 框架
       └──→ A5 数据查询 API
             └──→ A6 Dashboard 页面
                   └──→ A3 数据调度器
                         └──→ A4 数据质量监控

新增依赖

sqlalchemy[asyncio]>=2.0
asyncpg>=0.29
psycopg2-binary>=2.9
apscheduler>=3.10

验证清单


最后更新: 2026-03-01